[[retry-config]]
= Configuration

Starting with version 2.9, for default configuration, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.

NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`.

Also, starting with that version, for more advanced configuration of the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Global Settings and Features].

By default, the containers for the retry topics will have the same concurrency as the main container.
Starting with version 3.0, you can set a different `concurrency` for the retry containers (either on the annotation, or in `RetryConfigurationBuilder`).

IMPORTANT: Only one of the above techniques can be used, and only one `@Configuration` class can extend `RetryTopicConfigurationSupport`.

[[using-the-retryabletopic-annotation]]
== Using the `@RetryableTopic` annotation

To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.

[source, java]
----
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}
----

Since 3.2, `@RetryableTopic` support for @KafkaListener on a class would be:
[source,java]
----
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}
----

You can specify a method in the same class to process the dlt messages by annotating it with the `@DltHandler` annotation.
If no DltHandler method is provided a default consumer is created which only logs the consumption.

[source, java]
----
@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
----

NOTE: If you don't specify a kafkaTemplate name a bean with name `defaultRetryTopicKafkaTemplate` will be looked up.
If no bean is found an exception is thrown.

Starting with version 3.0, the `@RetryableTopic` annotation can be used as a meta-annotation on custom annotations; for example:

[source, java]
----
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}
----

[[using-retrytopicconfiguration-beans]]
== Using `RetryTopicConfiguration` beans

You can also configure the non-blocking retry support by creating `RetryTopicConfiguration` beans in a `@Configuration` annotated class.

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}
----

This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with `+++@+++KafkaListener` using the default configurations. The `KafkaTemplate` instance is required for message forwarding.

To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one `RetryTopicConfiguration` bean can be provided.

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
----

NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one which you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix.
If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.

IMPORTANT: If the consumer is configured with an xref:kafka/serdes.adoc#error-handling-deserializer[`ErrorHandlingDeserializer`], to handle deserialization exceptions, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions.
The generic value type of the template should be `Object`.
One technique is to use the `DelegatingByTypeSerializer`; an example follows:

[source, java]
----
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
----

IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.

[[retry-topic-global-settings]]
== Configuring Global Settings and Features

Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
This does not change the `RetryTopicConfiguration` beans approach - only infrastructure components' configurations.
Now the `RetryTopicConfigurationSupport` class should be extended in a (single) `@Configuration` class, and the proper methods overridden.
An example follows:

[source, java]
----

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
----

IMPORTANT: When using this configuration approach, the `@EnableKafkaRetryTopic` annotation should not be used to prevent context failing to start due to duplicated beans.
Use the simple `@EnableKafka` annotation instead.

When `autoCreateTopics` is true, the main and retry topics will be created with the specified number of partitions and replication factor.
Starting with version 3.0, the default replication factor is `-1`, meaning using the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
To override these values for a particular topic (e.g. the main topic or DLT), simply add a `NewTopic` `@Bean` with the required properties; that will override the auto creation properties.

IMPORTANT: By default, records are published to the retry topic(s) using the original partition of the received record.
If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows.

[source, java]
----
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}
----

The parameters to the function are the consumer record and the name of the next topic.
You can return a specific partition number, or `null` to indicate that the `KafkaProducer` should determine the partition.

By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics.
Starting with version 2.9.6, if you want to retain just the last value of these headers, use the `configureDeadLetterPublishingContainerFactory()` method shown above to set the factory's `retainAllRetryHeaderValues` property to `false`.

[[find-retry-topic-config]]
== Find RetryTopicConfiguration
Attempts to provide an instance of `RetryTopicConfiguration` by either creating one from a `@RetryableTopic` annotation, or from the bean container if no annotation is available.

If beans are found in the container, there's a check to determine whether the provided topics should be handled by any of such instances.

If `@RetryableTopic` annotation is provided, a `DltHandler` annotated method is looked up.

since 3.2, provide new API to Create `RetryTopicConfiguration` when `@RetryableTopic` annotated on a class:

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}
----